Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamline the database interfaces and add pgx adapter #29

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from

Conversation

julesjcraske
Copy link

@julesjcraske julesjcraske commented Feb 8, 2024

I am opening this draft PR to see if you are interested in this direction for the library (trimming down the interfaces to only what is used). If so I can put the effort in to get it up to your required standards and resolve the final issues. If not I can close it and keep my fork.

Thanks for all the great work you have done on the lib so far!

I want to use watermill to fulfil the outbox pattern (persisting events that will be asynchronously broadcast as part of the main persistence transaction) but my company mainly uses the pgx library.

This change makes this library more flexible to allow for the use of the pgx library. I can keep the pgx adapter itself elsewhere, outside of this repo if you would like.

  • Strips out all of the unused methods declared in the database Beginner and Executor (totally unused) and ContextExecutor interfaces.
  • Adds interfaces for the Exec Result type and the Query Rows type.
  • Adds small plumbing adapters for the StdSQL and Pgx types

Still to do:

  • Make it so that it is backwards compatible initialising a Publisher or Subscriber with a raw StdSQL connection or transaction.
  • Add to documentation.
  • Add tests against a pgx adapter.

pkg/sql/sql_adapter.go Outdated Show resolved Hide resolved
@m110
Copy link
Member

m110 commented Aug 23, 2024

Thanks for contributing @julesjcraske and I'm sorry it took us so long to reply!

This looks great, and I since we're getting more people asking about using specific database frameworks with the outbox pattern, I'm happy to go ahead with it. Since you changed the public interfaces anyway, I think we can't keep backward compatibility and will need to bump major version, unless you see some way?

lyubchev added a commit to Integer-Technologies-B-V/watermill-sql that referenced this pull request Aug 26, 2024
@julesjcraske
Copy link
Author

julesjcraske commented Sep 3, 2024

I think we can't keep backward compatibility and will need to bump major version, unless you see some way?

Thanks for taking a look, its great to here you are interested in the concept. I will try and find time to explore the options backwards compat and finishing touches on the PR over the next few weeks.

Comment on lines 81 to 93
func (t Tx) Rollback() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

return t.Tx.Rollback(ctx)
}

func (t Tx) Commit() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

return t.Tx.Commit(ctx)
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the stdSQL interface this interface uses doesn't take a context, it looks like I decided to create a context and add a timeout here.

I don't want to refactor too much to make context available here in this PR.

  1. I could just use a background context with no timeout
  2. I could make the timeout configurable when creating this adapter
  3. I could keep a hardcoded default timeout as it is currently (I don't like this as much)

@julesjcraske
Copy link
Author

Update: I have created a pull request on the pgxtype repository to add the xid8 type that is currently used in the postgres schema. jackc/pgtype#220

There are still some more fixes required and issues to resolve after that, but that gets the majority of the tests passing with the new adapters.

@m110
Copy link
Member

m110 commented Oct 10, 2024

Hey @julesjcraske, I see the xid8 is supported natively in pgx v5. Should we stick to this version then?

@davideimola
Copy link

Hey @julesjcraske and @m110,

I have tried to create a fork for pgx and upgrade everything to v5 which works just fine.

If you want to take a look I put everything on this repo.

https://github.com/davideimola/watermill-pgx

@julesjcraske
Copy link
Author

Upgrading pgx to v5 in this PR was easy actually thanks.

The problem I have had is that with pgx even with the same DefaultPostgresqlSchema, the subscriber initialising queries that create tables have to be run inside a transaction otherwise I get the errors:

    test_pubsub.go:347: could not initialize schema: ERROR: duplicate key value violates unique constraint "pg_class_relname_nsp_index" (SQLSTATE 23505)
    test_pubsub.go:347: could not initialize schema: ERROR: duplicate key value violates unique constraint "pg_class_relname_nsp_index" (SQLSTATE 23505)
error on publish: could not insert message as row: ERROR: relation "test_pgx_topic_83ad5299-10bf-406f-bf15-87c3161443be_11" does not exist (SQLSTATE 42P01), 4 retries left
error on publish: could not insert message as row: ERROR: relation "test_pgx_topic_83ad5299-10bf-406f-bf15-87c3161443be_7" does not exist (SQLSTATE 42P01), 4 retries left

I added a mechanism to run schema transactions inside a transaction and the tests then run. There might be another way to solve it? I followed advice in https://stackoverflow.com/questions/74261789/postgres-create-table-if-not-exists-%E2%87%92-23505

But now the schemas initialise, when querying for messages I get the error logs:

[watermill] 2024/10/16 09:50:53.895518 subscriber.go:280: 	level=ERROR msg="could not commit tx for querying message" consumer_group=test err="commit unexpectedly resulted in rollback" subscriber_id=01JAA8BPYF5TBCJ0ZJBQNBMSHE topic=topic_f8ed06a2-10d3-4f92-bbb7-062b0f70e5c2 

These see, rather dangerous as the tests pass, but errors are only logged. Shouldn't the transaction commit be outside the defer?

@m110
Copy link
Member

m110 commented Oct 16, 2024

Thanks for looking into it @julesjcraske!

Wouldn't this indicate that two subscribers, in parallel, try to create a table with the same name? 🤔 I guess this could happen if tests initialize the schema with both the publisher and subscriber. Perhaps we could adjust how they do it instead.

@julesjcraske
Copy link
Author

Thanks for looking into it @julesjcraske!

Wouldn't this indicate that two subscribers, in parallel, try to create a table with the same name? 🤔 I guess this could happen if tests initialize the schema with both the publisher and subscriber. Perhaps we could adjust how they do it instead.

The weird thing is, the Postgres with standard SQL adapter tests work fine, so it doesn't seem to actually be a database issue but an issue with the library used to connect. It definitely warrants more investigation.

@davideimola
Copy link

Hi @m110 and @julesjcraske!
Is there any way that I could help with this PR? I needed to solve my compatibility issues as I am actually running a custom implementation instead of the actual library!

@julesjcraske
Copy link
Author

julesjcraske commented Nov 12, 2024 via email

@m110
Copy link
Member

m110 commented Nov 18, 2024

Hey @julesjcraske. I looked into the logged errors, and as far as I understand, they could also be happening before your changes, as we wouldn't usually log them if the tests pass correctly. 🤔 So perhaps this is unrelated?

@julesjcraske
Copy link
Author

julesjcraske commented Nov 27, 2024

Hey @julesjcraske. I looked into the logged errors, and as far as I understand, they could also be happening before your changes, as we wouldn't usually log them if the tests pass correctly. 🤔 So perhaps this is unrelated?

Yeah i noticed the logs are don't actually cause a test failure.

You are right I did seem to get all the tests passing by adding the transaction around the schema migrations.

I have just added a small improvement to duplicated code, and added a backward compatibility fix for NewSubscriber, introducing NewSubscriberV2 with new interface. I could also make NewSubscriber accept a new interface value as well, but that would encourage its usage. Alternatively you might prefer to make a backward incompatible change, but I don't think it is worth it if it is this easily avoidable?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants